package kim.spider.avro.mapreduce.output;

import java.io.IOException;
import java.util.Arrays;

import kim.spider.io.MapAvroFile;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.mapred.Pair;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.Progressable;

public class AvroMapOutputFormat<K, V> extends FileOutputFormat<K, V> {

	/** The configuration key for Avro deflate level. */
	public static final String	DEFLATE_LEVEL_KEY			= "avro.mapred.deflate.level";

	/** The default deflate level. */
	public static final int			DEFAULT_DEFLATE_LEVEL	= 1;

	/** Enable output compression using the deflate codec and specify its level. */
	public static void setDeflateLevel(Job job, int level) {
		FileOutputFormat.setCompressOutput(job, true);
		job.getConfiguration().setInt(DEFLATE_LEVEL_KEY, level);
	}
	@Override
	public void checkOutputSpecs(JobContext job)
			throws FileAlreadyExistsException, IOException {
		// Ensure that the output directory is set and not already there
		Path outDir = getOutputPath(job);
		if (outDir == null) {
			throw new InvalidJobConfException("Output directory not set.");
		}

		// get delegation token for outDir's file system
//		TokenCache.obtainTokensForNamenodes(new Path[] { outDir },
//				job.getConfiguration());
//
//		if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
//			throw new FileAlreadyExistsException("Output directory " + outDir
//					+ " already exists");
//		}
	}

	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

		Path path = this.getDefaultWorkFile(job, "");
		final Schema keySchema = ReflectData.get().getSchema(
				job.getOutputKeyClass());
		final Schema valueSchema = ReflectData.get().getSchema(
				job.getOutputValueClass());
		int level = job.getConfiguration().getInt(DEFLATE_LEVEL_KEY,
				DEFAULT_DEFLATE_LEVEL);
		final MapAvroFile.Writer<K, V> writer = new MapAvroFile.Writer<K, V>(
				job.getConfiguration(), FileSystem.get(job.getConfiguration()),
				path.toString(), keySchema, valueSchema, job, level);
		return new RecordWriter<K, V>() {

			@Override
			public void write(K key, V value) throws IOException {

				// TODO Auto-generated method stub
				writer.append(key, value);
			}

			@Override
			public void close(TaskAttemptContext context) throws IOException,
					InterruptedException {
				// TODO Auto-generated method stub
				writer.close();
			}
		};
	}

	/** Open the output generated by this format. */
	public static MapAvroFile.Reader[] getReaders(Path dir, Configuration conf)
			throws IOException {
		FileSystem fs = dir.getFileSystem(conf);
		Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, new PathFilter() {

			@Override
			public boolean accept(Path path) {
				if (path.toString().equals("logs"))
					return false;
				else
					return true;
			}
		}));

		// sort names, so that hash partitioning works
		Arrays.sort(names);

		MapAvroFile.Reader[] parts = new MapAvroFile.Reader[names.length];
		for (int i = 0; i < names.length; i++) {
			parts[i] = new MapAvroFile.Reader(fs, names[i].toString(), conf);
		}
		return parts;
	}

	/** Get an entry from output generated by this class. */
	public static <K, V> V getEntry(MapAvroFile.Reader<K, V>[] readers,
			Partitioner<K, V> partitioner, K key, V value) throws IOException {
		int part = partitioner.getPartition(key, value, readers.length);
		return readers[part].get(key);
	}

}